Spark Partition

Spark is an engine for parallel processing of data on a cluster. Parallelism in Apache Spark allows developers to perform tasks on hundreds of machines in a cluster in parallel and independently. RDDs are stored in partitions on different cluster nodes. 
  • Partition basically is a logical chunk of a large distributed data set. 
  • Each of these is then sent to an Executor to be processed. Only one partition is computed per executor thread at a time, therefore the size and quantity of partitions passed to an executor is directly proportional to the time it takes to complete.
  • It provides the possibility to distribute the work across the cluster, divide the task into smaller parts, and reduce memory requirements for each node. 
  • Partition is the main unit of parallelism in Apache Spark. Spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence. 

  • Once the user has submitted his job into the cluster, each partition is sent to a specific executor for further processing. Only one partition is processed by one executor at a time, so the size and number of partitions transferred to the executor are directly proportional to the time it takes to complete them. 
  • Thus the more partitions the more work is distributed to executors, with a smaller number of partitions the work will be done in larger pieces (and often faster). 
  • Resilient Distributed Datasets are collection of various data items that are so huge in size, that they cannot fit into a single node and have to be partitioned across various nodes. Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.
How Spark partitions data files?
One main advantage of the Spark is, it splits data into multiple partitions and executes operations on all partitions of data in parallel which allows us to complete the job faster. While working with partition data we often need to increase or decrease the partitions based on data distribution. Methods repartitions and coalesce helps us to repartitions. When not specified programmatically or through configuration, Spark by default partitions data based on number of factors and the factors differs were you running your job on and what mode.

Local Mode
When you running on local in standalone mode, Spark partitions data into the number of CPU cores you have on your system or the value you specify at the time of creating SparkSession object

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
    .master("local[5]")
    .appName("spark")
    .getOrCreate()

The above example provides local[5] as an argument to master() method meaning to run the job locally with 5 partitions. Though if you have just 2 cores on your system, it still creates 5 partition tasks.

val df = spark.range(0,20)
println(df.rdd.partitions.length)

Creating Partition in Spark
Here’s a simple example that creates a list of 100 integers with 3 partitions –
var RDD = sc.parallelize (1 to 100, 3)
RDD.map(x => x).collect

RDD.getNumPartitions

Characteristics of Partition in Apache Spark
  • Every machine in a spark cluster contains one or more partitions.
  • The number of partitions in spark are configurable and having too few or too many partitions is not good.
  • Partitions in Spark do not span multiple machines.
Partition in Apache Spark
One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark. Apache Spark manages data through RDDs using partitions which help parallelize distributed data processing with negligible network traffic for sending data between executors. By default, Apache Spark reads data into an RDD from the nodes that are close to it.

Communication is very expensive in distributed programming, thus laying out data to minimize network traffic greatly helps improve performance. Just like how a single node program should choose the right data structure for a collection of records, a spark program can control RDD partitioning to reduce communications. Partitioning in Spark might not be helpful for all applications, for instance, if a RDD is scanned only once, then portioning data within the RDD might not be helpful but if a dataset is reused multiple times in various key oriented operations like joins, then partitioning data will be helpful.

Partitioning is an important concept in apache spark as it determines how the entire hardware resources are accessed when executing any job. In apache spark, by default a partition is created for every HDFS partition of size 128MB. RDDs are automatically partitioned in spark without human intervention, however, at times the programmers would like to change the partitioning scheme by changing the size of the partitions and number of partitions based on the requirements of the application. For custom partitioning developers have to check the number of slots in the hardware and how many tasks an executor can handle to optimize performance and achieve parallelism.

Empty partition Problem
 A filtering operation does not change the number of memory partitions in a DataFrame.

Suppose you have a data lake with 25 billion rows of data and 60,000 memory partitions. Suppose you run a filtering operation that results in a DataFrame with 10 million rows. After filtering, you’ll still have 60,000 memory partitions, many of which will be empty. You’ll need to run repartition() or coalesce() to spread the data on an appropriate number of memory partitions.

val df = spark.read.parquet("/some/path") // 60,000 memory partitions
val filteredDF = df.filter(col("age") > 98) // still 60,000 memory partitions
// at this point, any operations performed on filteredDF will be super inefficient
val repartitionedDF = filtereDF.repartition(200) // down to 200 memory partitions


val path = new java.io.File("./src/test/resources/person_data.csv").getCanonicalPath
val df = spark
  .read
  .option("header", "true")
  .csv(path)
  .repartition(200)

println(df.rdd.partitions.size) // 200


val filteredDF = df.filter(col("person_country") === "Cuba")
println(filteredDF.rdd.partitions.size) // 200

There are only 5 rows of Cuba data and 200 memory partitions, so we know that at least 195 memory partitions are empty.

Having a lot of empty memory partitions significantly slows down analyses on production-sized datasets.

Selecting an appropriate number of memory partitions

Choosing the right number of memory partitions after filtering is difficult.

You can follow the 1GB per memory partition rule of thumb to estimate the number of memory partitions that’ll be appropriate for a filtered dataset.

Suppose you have 25 billion rows of data, which is 10 terabytes on disk (10,000 GB).

An extract with 500 million rows (2% of the total data) is probably around 200 GB of data (0.02 * 10,000), so 200 memory partitions should work well.

How many partition should a Spark RDD have?
Having too large a number of partitions or too few - is not an ideal solution. The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all. Apache Spark can run a single concurrent task for every partition of an RDD, up to the total number of cores in the cluster. If a cluster has 30 cores then programmers want their RDDs to have 30 cores at the very least or maybe 2 or 3 times of that.

As already mentioned above, one partition is created for each block of the file in HDFS which is of size 128MB.However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD.

val rdd= sc.textFile (“file.txt”, 5)

The above line of code will create an RDD named textFile with 5 partitions. Suppose that you have a cluster with four cores and assume that each partition needs to process for 5 minutes. In case of the above RDD with 5 partitions, 4 partition processes will run in parallel as there are four cores and the 5th partition process will process after 5 minutes when one of the 4 cores, is free. The entire processing will be completed in 10 minutes and during the 5th partition process, the resources (remaining 3 cores) will remain idle. The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way. The number of partitions in a Spark RDD can always be found by using the partitions method of RDD. For the RDD that we created the partitions method will show a    n output of 5 partitions

Scala> rdd.partitions.size
Output = 5

If an RDD has too many partitions, then task scheduling may take more time than the actual execution time. To the contrary, having too less partitions is also not beneficial as some of the worker nodes could just be sitting idle resulting in less concurrency. This could lead to improper resource utilization and data skewing i.e. data might be skewed on a single partition and a worker node might be doing more than other worker nodes. Thus, there is always a trade off when it comes to deciding on the number of partitions.

Guidelines for the number of partition in spark are as follows.
  • When the number of partitions is between 100 and 10K partitions based on the size of the cluster and data, the lower and upper bound should be determined.
  • The lower bound for spark partitions is determined by 2 X number of cores in the cluster available to application.
  • Determining the upper bound for partitions in Spark, the task should take 100+ ms time to execute. If it takes less time, then the partitioned data might be too small or the application might be spending extra time in scheduling tasks.
Types of Partition in Apache Spark
  • Hash Partitioning in Spark
  • Range Partitioning in Spark
Partition Wisely
So how do you avoid skewed data and shuffle blocks? Partitioning wisely. It’s critical to partition wisely in order to manage memory pressure as well as to ensure complete resource utilization on executor’s nodes. You must always know your data—size, types, and how it’s distributed. A couple of best practices to remember are:
  • Understanding and selecting the right operators for actions like reduceByKey or aggregateByKey so that your driver is not put under pressure and the tasks are properly executed on executors.
  • If your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.
  • Also, if data is skewed then repartitioning using an appropriate key which can spread the load evenly is also recommended.
Shewed Data & Shuffle Blocks
Processing with Apache Spark's default partitioning might cause data to be skewed which, in turn, can cause problems related for shuffle during aggregation operations or single executor not having sufficient memory.


Here we see “key-a” has a larger amount of data in the partition so tasks on Exec-5 will take much longer to complete than the other five tasks. Another important thing to remember is that Spark shuffle blocks can be no greater than 2 GB (internally because the ByteBuffer abstraction has a MAX_SIZE set to 2GB). For example, if you are running an operation such as aggregations, joins or cache operations, a Spark shuffle will occur & having a small number of partitions or data skews can cause a high shuffle block issue. Hence, if you started seeing an error related to breach of MAX_SIZE limits due to shuffle you know why it’s happening as it may be tied to skewed data.To divide the data into partitions first we need to store it. Spark stores its data in form of RDDs.

No comments:

Post a Comment